agentmux_srv\drone\executor\blocks/
agent.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Agent block — one-shot Claude Code spawn driven by an `AgentRef`
5//! and a per-call task template.
6//!
7//! Phase 1.5 PR 2: replaces the original stub
8//! (`{ response: "[stub]" }`) with a real invocation of
9//! `agents::runner::run_agent`. The runner spawns
10//! `claude --print --output-format=stream-json`, drains its stdout
11//! through `ClaudeTranslator`, and produces a structured
12//! `AgentRunResult` that this function flattens into the
13//! snake_case drone-block output shape.
14//!
15//! Block config (`node.data`):
16//!   * `task`         — required. Mustache-style template resolved
17//!                      against `scope.outputs + scope.vars` before
18//!                      spawning.
19//!   * `agent_ref`    — optional. Object matching `AgentRef` shape
20//!                      (camelCase keys): identityId, memoryId,
21//!                      instanceName, workingDirectory. All fields
22//!                      optional; missing = blank agent.
23//!   * `max_turns`    — optional. Hard cap on claude turns.
24//!
25//! Output (snake_case to match other drone blocks — see spec
26//! §4.5):
27//!
28//!   ```json
29//!   {
30//!     "response": "<text>",
31//!     "tokens": { "input": .., "output": .., "cache_creation": .., "cache_read": .. },
32//!     "cost_usd": 0.001,
33//!     "status": "done"
34//!   }
35//!   ```
36//!
37//! Downstream blocks read `{{<this_block_id>.response}}` for the
38//! agent's reply and `{{<this_block_id>.cost_usd}}` for accounting.
39
40use serde_json::{json, Value};
41
42use crate::agents::runner::{run_agent, AgentError};
43use crate::agents::types::{AgentRef, AgentTask};
44use crate::drone::data_flow::ExecutionScope;
45use crate::drone::types::FlowNode;
46
47pub async fn run(node: &FlowNode, scope: &ExecutionScope) -> Result<Value, String> {
48    let task_raw = node
49        .data
50        .get("task")
51        .and_then(|v| v.as_str())
52        .ok_or_else(|| "agent block missing `task`".to_string())?;
53    let prompt = scope.resolve(task_raw);
54
55    let agent_ref: AgentRef = match node.data.get("agent_ref") {
56        Some(v) => serde_json::from_value(v.clone())
57            .map_err(|e| format!("agent block: invalid agent_ref: {e}"))?,
58        None => {
59            // Pre-Phase-1.5 nodes persisted a single `forge_agent_id`
60            // string; the runner can't honor it because identity and
61            // memory are now separate bundles. Surface the legacy data
62            // in the log so the user knows why the agent launches
63            // blank (#835).
64            if let Some(legacy) = node.data.get("forge_agent_id").and_then(|v| v.as_str()) {
65                if !legacy.is_empty() {
66                    tracing::warn!(
67                        block_id = %node.id,
68                        legacy_forge_agent_id = %legacy,
69                        "agent block: legacy `forge_agent_id` ignored — re-pick identity/memory after Phase 1.5 PR 3"
70                    );
71                }
72            }
73            AgentRef::default()
74        }
75    };
76
77    let max_turns = node
78        .data
79        .get("max_turns")
80        .and_then(|v| v.as_u64())
81        .map(|n| n as u32);
82
83    let task = AgentTask {
84        prompt,
85        // The runner doesn't currently use the `context` map (claude
86        // takes the prompt on argv); leave it empty. Phase 2 may
87        // surface scope vars as a `system` message section.
88        context: serde_json::Map::new(),
89        max_turns,
90    };
91
92    // Forward AgentEvents into a local channel and discard for now.
93    // Phase 1.5 PR 3 will re-emit them on the `dronerun:<id>`
94    // broker so the inspector pane can render the live stream.
95    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
96    let handle = run_agent(agent_ref, task, tx).await.map_err(|e| match e {
97        AgentError::Spawn(msg) => format!("agent block: spawn failed: {msg}"),
98        AgentError::InvalidRef(msg) => format!("agent block: invalid agent ref: {msg}"),
99    })?;
100
101    // Drain the event channel concurrently so the runner's sender
102    // can make progress. Drop the events for now — the captured
103    // accumulator from `final_result` is the authoritative output.
104    tokio::spawn(async move {
105        while rx.recv().await.is_some() {}
106    });
107
108    let result = handle
109        .final_result
110        .await
111        .map_err(|e| format!("agent block: runner cancelled: {e}"))?
112        .map_err(|e| format!("agent block: agent run failed: {e}"))?;
113
114    // Manually flatten to snake_case to match other drone block
115    // outputs (the AgentRunResult's serde camelCase is for the IPC
116    // seam with the frontend, NOT for drone templates — see spec
117    // §4.5 NOTE).
118    Ok(json!({
119        "response": result.response,
120        "tokens": {
121            "input": result.tokens.input,
122            "output": result.tokens.output,
123            "cache_creation": result.tokens.cache_creation,
124            "cache_read": result.tokens.cache_read,
125        },
126        "cost_usd": result.cost_usd,
127        "status": "done",
128    }))
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134    use crate::drone::types::NodePosition;
135
136    fn mk_node(data: Value) -> FlowNode {
137        FlowNode {
138            id: "a1".to_string(),
139            position: NodePosition::default(),
140            data,
141            node_type: String::new(),
142        }
143    }
144
145    #[tokio::test]
146    async fn rejects_missing_task() {
147        let node = mk_node(json!({ "kind": "agent" }));
148        let scope = ExecutionScope::new();
149        let err = run(&node, &scope).await.expect_err("must error");
150        assert!(err.contains("missing `task`"), "got: {err}");
151    }
152
153    #[tokio::test]
154    async fn rejects_malformed_agent_ref() {
155        let node = mk_node(json!({
156            "kind": "agent",
157            "task": "hi",
158            "agent_ref": "not an object"
159        }));
160        let scope = ExecutionScope::new();
161        let err = run(&node, &scope).await.expect_err("must error");
162        assert!(err.contains("invalid agent_ref"), "got: {err}");
163    }
164
165    // The spawn-failure → "agent block: spawn failed: ..." mapping is
166    // covered by `agents::runner::tests::run_agent_with_bin_surfaces_spawn_failure`,
167    // which injects a nonexistent binary path via the internal
168    // `run_agent_with_bin` entry point instead of `std::env::set_var`
169    // (unsound under concurrent test execution in Rust 1.81+).
170    // Reagent P2 on PR #834.
171
172    /// Reproduces the parse-only path of `run()` for a node carrying
173    /// legacy `forge_agent_id` (no `agent_ref`). The runner shim isn't
174    /// invoked — the assertion is that we accept the node and fall back
175    /// to a default `AgentRef`. The deprecation warning fires as a side
176    /// effect; we keep this test free of `tracing` plumbing.
177    #[test]
178    fn legacy_forge_agent_id_falls_back_to_default_ref() {
179        let data = json!({
180            "kind": "agent",
181            "task": "hi",
182            "forge_agent_id": "legacy-id-123"
183        });
184        let agent_ref: AgentRef = match data.get("agent_ref") {
185            Some(v) => serde_json::from_value(v.clone()).unwrap(),
186            None => AgentRef::default(),
187        };
188        assert_eq!(agent_ref, AgentRef::default());
189        // Confirms the legacy field is still present in node.data — the
190        // production path reads it for the warn-log; tests don't.
191        assert_eq!(
192            data.get("forge_agent_id").and_then(|v| v.as_str()),
193            Some("legacy-id-123")
194        );
195    }
196}